package org.ykx.demo.streaming

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.Seconds
import org.apache.spark.rdd.RDD
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.HasOffsetRanges

object StreamingKafkaCheckpoint {
  private var checkpointDirectory = "/user/StreamingKafkaCheckpoint/checkpoint"
  private var brockers = "10.10.61.193:6667"
  private var topic = "TMR"
  private var batInterver: Int = 5
  private var checkpointInterver: Int = 5

  def main(args: Array[String]): Unit = {
    getParams(args)
    println("params: checkpointpath topic interver brockers")
    //使用检查点方式时需要使用 StreamingContext.getOrCreate获得StreamingContext
    //检查点目录应为HDFS目录
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => { functionToCreateContext(topic, brockers, checkpointDirectory, batInterver, checkpointInterver) })
    ssc.start()
    ssc.awaitTermination()
  }

  def functionToCreateContext(topic: String, brockers: String, checkpointDirectory: String, batInterver: Int, checkpointInterver: Int): StreamingContext = {

    val conf = new SparkConf().setAppName("StreamingKafka").set("spark.driver.allowMultipleContexts", "true")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(batInterver))
    if (!checkpointDirectory.isEmpty()) {
      ssc.checkpoint(checkpointDirectory)
    }

    //使用检查点时KafkaUtils.createDirectStream必须在该函数内直接或间接调用，否则在调用调用StreamingContext的start函数时会报未初始化错误
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brockers, "serializer.class" -> "kafka.serializer.StringEncoder")
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))
    kafkaStream.checkpoint(Seconds(checkpointInterver))

    kafkaStream.foreachRDD { rdd =>
      {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        offsetRanges.foreach { x => println(x.fromOffset + "====" + x.untilOffset + "  " + (x.untilOffset - x.fromOffset)) }
      }
    }

    return ssc
  }

  def getParams(args: Array[String]): Boolean = {
    println("params: topic interver brockers")

    if (args.length > 0) {
      checkpointDirectory = args(0)
    }

    if (args.length > 1) {
      topic = args(1)
    }

    if (args.length > 2) {
      batInterver = args(2).toInt
      if (!batInterver.isValidInt || batInterver.equals(0)) {
        batInterver = 5
        println("input invalid interver, use 5 instead")
      }
    }

    if (args.length > 3) {
      checkpointInterver = args(3).toInt
      if (!checkpointInterver.isValidInt || checkpointInterver.equals(0)) {
        checkpointInterver = 10
        println("input invalid interver, use 5 instead")
      }
    }

    if (args.length > 4) {
      brockers = args(4)
    }
    println("checkpointDirectory=" + checkpointDirectory + " topic=" + topic + " batInterver=" + batInterver + " checkpointInterver=" + checkpointInterver + " brockers=" + brockers)
    return true
  }
}